package gu.simplemq;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;

import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;

import static com.google.common.base.Preconditions.*;

import gu.simplemq.exceptions.SmqUnsubscribeException;
import gu.simplemq.utils.CommonUtils;

/**
 * (消息)频道对象定义<br>
 * 这里的频道概念可以是发布/订阅模型中的频道,也可以是生产者/消费者模型中的队列
 * @author guyadong
 *
 * @param <T> 频道消息数据类型
 */
public class Channel<T> implements IMessageAdapterExt<T>, Cloneable,Constant {
	/**  频道名(消息来源) */
	public final String name;
	/**  频道对应的消息数据类型 */
	public final Type type;
	/** 是否为可修改对象 */
	private boolean mutable = true;
	private boolean autoack = true;
	/**  频道对应的消息处理器 */
	private IMessageAdapter<T> adapter;
	/** 注销侦听器表,当前频道被注销时调用 */
	private final LinkedHashSet<IUnregistedListener<T>> unregistedListeners = Sets.newLinkedHashSet();
	public Channel(String name, Class<T> clazz) {
		this(name,(Type)clazz);
	}
    private static Class<?> getRawClass(Type type){
        if(type instanceof Class<?>){
            return (Class<?>) type;
        } else if(type instanceof ParameterizedType){
            return getRawClass(((ParameterizedType) type).getRawType());
        } else{
            throw new IllegalArgumentException("invalid type");
        }
    }
    /**
     * usage:<pre>new Channel&lt;Model&gt;("name"){};</pre>
     * @param name 频道(通道)名,不可为{@code null}
     */
    protected Channel(String name){
    	checkArgument(!Strings.isNullOrEmpty(name), "name is null or empty");
        this.name = name;
        Type superClass = getClass().getGenericSuperclass();
        this.type = getRawClass(((ParameterizedType) superClass).getActualTypeArguments()[0]);
    }
    /**
     * usage:<pre>new Channel&lt;Model&gt;("name",handle){};</pre>
     */
    protected Channel(String name,IMessageAdapter<T> handle){
    	this(name);
    	this.adapter = handle;
    }
	public Channel(String name, Type type) {
		super();
		checkArgument(!Strings.isNullOrEmpty(name), "name is null or empty");
		this.name = name;
		checkArgument((type instanceof ParameterizedType) || (type instanceof Class<?>),
				"invalid type of 'type',ParameterizedType or Class required");
		this.type = type;
	}
	public Channel(String name, Type type,IMessageAdapter<T> handle) {
		this(name, type);
		this.adapter = handle;
	}
	
	public Channel(String name, Class<T> clazz,IMessageAdapter<T> handle) {
		this(name,(Type)clazz,handle);
	}
	
	/**
	 * {@link #adapter} 代理方法,用于调用 {@link #adapter}的被代理方法前对参数进行合法性检查
	 * @see gu.simplemq.IMessageAdapter#onSubscribe(java.lang.Object)
	 */
	@Override
	public void onSubscribe(T t) throws SmqUnsubscribeException {
		if(null == this.adapter || null == t){
			return;
		}
		try{
			this.adapter.onSubscribe(t);
		}catch(SmqUnsubscribeException e){
			throw e;
		}catch(Throwable e){
			logger.error(e.getMessage(),e);
		}
	}
	/**
	 * {@link #adapter} 代理方法,用于调用 {@link #adapter}的被代理方法前对参数进行合法性检查
	 * @see gu.simplemq.IMessageAdapterExt#onSubscribe(java.lang.Object,SimplemqContext)
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	@Override
	public void onSubscribe(T t,SimplemqContext context) throws SmqUnsubscribeException {
		if(null == this.adapter || null == t){
			return;
		}
		SimplemqContext.setCurrentContext(context);
		try{
			if(this.adapter instanceof IMessageAdapterExt) {
				((IMessageAdapterExt)this.adapter).onSubscribe(t,context);
			}else {
				this.adapter.onSubscribe(t);
			}
		}catch(SmqUnsubscribeException e){
			throw e;
		}catch(Throwable e){
			logger.error(e.getMessage(),e);
		}
	}
	/**
	 * 当前频道被注销时调用
	 * @see #addUnregistedListener(IUnregistedListener)
	 */
	public void onUnregisted(){
		synchronized(unregistedListeners){
			IUnregistedListener<T> listener;
			for(Iterator<IUnregistedListener<T>> itor = unregistedListeners.iterator();itor.hasNext();){
				listener = itor.next();
				itor.remove();
				listener.apply(this);
			}
		}
	}
	/**
	 * 添加注销侦听器
	 * @param unregistedListener
	 * @return 
	 */
	public Channel<T> addUnregistedListener(IUnregistedListener<T> unregistedListener) {
		checkMutable();
		synchronized(this.unregistedListeners){
			this.unregistedListeners.add(checkNotNull(unregistedListener));
		}
		return this;
	}
	/**
	 * @return adapter
	 */
	public IMessageAdapter<T> getAdapter() {
		return adapter;
	}
	/**
	 * @param adapter 要设置的 adapter
	 * @return 
	 */
	public Channel<T> setAdapter(IMessageAdapter<T> adapter) {
		checkMutable();
		this.adapter = adapter;
		return this;
	}
	
	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("Channel [name=").append(name).append(", type=").append(type).append("]");
		return builder.toString();
	}
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((name == null) ? 0 : name.hashCode());
		result = prime * result + ((type == null) ? 0 : type.hashCode());
		return result;
	}
	
	@Override
	public boolean equals(Object obj) {
		if(super.equals(obj)){
			return true;
		}
		if(!(obj instanceof Channel)){
			return false;
		}
		Channel<?> other = (Channel<?>)obj;
		return Objects.equal(name, other.name) 
				&& Objects.equal(type, other.type);
	}
	
	public boolean equalsFull(Object obj) {
		return equals(obj) && (adapter == ((Channel<?>)obj).adapter);
	}
	@SuppressWarnings("unchecked")
	@Override
	public Channel<T> clone() {
		try {
			return (Channel<T>) super.clone();
		} catch (CloneNotSupportedException e) {
			throw new RuntimeException(e);
		}
	}
	/** 当前对象设置不可修改 */
	public Channel<T> immutable(){
		return mutable(false);
	}
	/** 
	 * 检查当前对象是否为可修改状态
	 * @throws IllegalStateException 当前对象不可修改
	 */
	private void checkMutable(){
		checkState(this.mutable,"immutable instance can't be modified");		
	}
	private Channel<T> mutable(boolean mutable){
		this.mutable = mutable;
		return this;
	}
	/**
	 * 返回从当前实例克隆的且状态为可修改(mutable)的新实例
	 */
	public Channel<T> asMutable(){
		return this.clone().mutable(true);
	}
	public Channel<T> setAutoack(boolean autoack){
		this.autoack = autoack;
		return this;
	}
	public boolean isAutoack() {
		return autoack;
	}
	public static String[] getChannelNames(Channel<?>... channels) {
		return getChannelNamesAsList(channels).toArray(new String[0]);
	}
	public static Set<String> getChannelNames(Collection<Channel<?>> channels) {
		HashSet<String> names = new HashSet<String>();
		for (Channel<?> ch : CommonUtils.cleanNullAsList(channels)) {
			names.add(ch.name);
		}
		return names;
	}
	public static Set<String> getChannelNamesAsList(Channel<?>... channels) {
		HashSet<String> names = new HashSet<String>();
		for (Channel<?> ch : CommonUtils.cleanNullAsList(channels)) {
			names.add(ch.name);
		}
		return names;
	}
}
